TruFactor COVID-19 Geo Mobility

TruFactor’s Geographic Mobility Dataset aims to provide a national and regional understanding of mobility changes across a significant portion of the United States while maintaining individual privacy. This dataset is intended to enable researchers, public officials and citizens to understand how population level behaviors are changing due to COVID-19 and the ensuing national, state and local response. Two datasets are being produced which may be used independently or in tandem to analyze how the population level mobility behaviors have changed. The first dataset provides intercounty travel patterns at a daily frequency. TruFactor's InterCounty Mobility dataset produces estimates of where a resident population is traveling to. This dataset can aid in understanding both the export and import flows of potentially infected populations. An example use of this dataset would be to model how a virus could spread overtime. Another potential use could be for modeling economic impacts due to reduced mobility patterns. The second dataset provides intracounty estimates of how mobility behaviors are changing by demographic group. TruFactor's IntraCounty Mobility dataset measures the change in travel behavior for a county’s residents by providing a decile distribution of the # of visited census tracts within the county. Travel behaviors for residents within a county by demographic group are provided to show how mobility is changing by age group at a daily frequency. Researchers and policymakers can understand how behaviors are changing in response to infection rates and policy shocks such as Stay At Home policies. In this tutorial we will show you how to work with free samples of InterCounty Mobility that a published in Amazon Data Exchange (https://aws.amazon.com/data-exchange/). We walk through several applications and show how some intelligent applications of these intelligence sets can power vertical oriented applications.

Step 0: Import Libraries

In [31]:
import findspark
findspark.init()
from pyspark import SparkConf,AccumulatorParam, SparkContext
from pyspark.sql import HiveContext, Column, Window, Row, DataFrame
from pyspark.sql.types import *
import pyspark.sql.functions as fns
from pyspark.sql.functions import col, lit
import pyspark

import datetime

import bokeh
import holoviews as hv
import sys
from holoviews import opts

hv.extension('bokeh')
%matplotlib inline
from shapely import geometry as gm
import numpy as np
import pandas as pd
import datashader as ds
import geopandas as gpd
import geoviews as gv
import seaborn as sns
import matplotlib.pyplot as plt
import os

Step 1: Measure Disease Spread

Now that we have a handle on the spread on social distancing, let's see how travel behaviors can influence the spread of the virus. Using TruFactor's InterCounty Mobility dataset one could calculate potential COVID-19 exports into an uninfected county. Given a set of infected counties, it is possible to calculate inflows to non-infected counties in a period and thus alter those non-infected counties infection forecasts. Using the Johns Hopkins dataset on COVID-19 we can illustrate this point.

First, we will show how mobile the US is by showing all the counties visited by those in infected counties during a day. Then we will show how seemingly small infection rates and combined with large mobility flows can lead to quick spread of infectious disease.

In [3]:
inter_path = 's3://trufactor-demos/data/covid/InterCountyMobility'
inter = spark.read.parquet(inter_path)
In [4]:
inter.show(10)
+----------+-------------+--------------------+--------------------+--------+
|HomeCounty|VisitedCounty|      PercentVisited|    PercentDeviation|      dt|
+----------+-------------+--------------------+--------------------+--------+
|     21077|        21191| 0.02238560000019313|4.931984073455829E-4|20200320|
|     47085|        21103|  0.0207723224813464|                 0.0|20200320|
|     47109|        01073|0.054524918076403917|4.300770999006458...|20200320|
|     06099|        06069|0.002459024605564307|1.780344670713895...|20200320|
|     06099|        06111|4.898166788961081E-4|3.647994331959206...|20200320|
|     29063|        29075| 0.06159407410558857|5.294122172698352E-4|20200320|
|     26065|        21067| 3.29292135154447E-4|3.474182188178881E-6|20200320|
|     26065|        17173|1.594799923772428E-4|2.444328451810528...|20200320|
|     26065|        26161|0.022733522645907576|7.733353890546475E-5|20200320|
|     13255|        13145|0.001541140915266...|2.523292534594672...|20200320|
+----------+-------------+--------------------+--------------------+--------+
only showing top 10 rows

Next we will pull down a COVID-19 data set being maintained by Johns Hopikns CSSEG group and get county shapefiles for mapping purposes.

In [7]:
%%bash
cd /home/hadoop/
git clone https://github.com/CSSEGISandData/COVID-19.git
Cloning into 'COVID-19'...
In [ ]:
%%bash 
cd /home/hadoop/
wget https://www2.census.gov/geo/tiger/TIGER2018/COUNTY/tl_2018_us_county.zip
unzip -d county_shp tl_2018_us_county.zip
In [5]:
cnty = gpd.read_file('/home/hadoop/county_shp/')
cnty.head(2)
Out[5]:
STATEFP COUNTYFP COUNTYNS GEOID NAME NAMELSAD LSAD CLASSFP MTFCC CSAFP CBSAFP METDIVFP FUNCSTAT ALAND AWATER INTPTLAT INTPTLON geometry
0 31 039 00835841 31039 Cuming Cuming County 06 H1 G4020 None None None A 1477652222 10690952 +41.9158651 -096.7885168 POLYGON ((-97.01952 42.00410, -97.01952 42.004...
1 53 069 01513275 53069 Wahkiakum Wahkiakum County 06 H1 G4020 None None None A 680956809 61588406 +46.2946377 -123.4244583 POLYGON ((-123.43639 46.23820, -123.44759 46.2...
In [6]:
import os
dat = []
pth = '/home/hadoop/COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/'
for fl in os.listdir(pth):
    if '.csv' in fl:
        d = pd.read_csv(pth+fl)
        d['dt'] = datetime.datetime.strptime(fl.split('.csv')[0],'%m-%d-%Y').strftime('%Y%m%d')
        dat.append(d)

The data structure has changed over time for COVID-19 reporting. Ultimately we'd like to extract county level data. Some data munging is required to get a common structure across time.

In [7]:
conf = pd.concat(dat)
In [8]:
conf['Province/State'].value_counts()
Out[8]:
Gansu                      62
Hebei                      62
Chongqing                  60
Henan                      60
Jiangxi                    60
                           ..
Ashland, NE                 1
Northwest Territories       1
Santa Cruz County, CA       1
Charlotte County, FL        1
Unassigned Location, VT     1
Name: Province/State, Length: 283, dtype: int64
In [9]:
conf['Province_State'].value_counts()
Out[9]:
Texas                    1270
Georgia                   800
Virginia                  665
Kentucky                  605
Missouri                  582
                         ... 
Wuhan Evacuee               3
Northwest Territories       2
Jervis Bay Territory        1
External territories        1
Yukon                       1
Name: Province_State, Length: 135, dtype: int64
In [10]:
## Will need abbreviation mappings
fips = pd.read_csv('state_fips.csv')
fips['FIPS'] = fips.FIPS.map(lambda x: str(x).zfill(2))

f = {}
for i in fips.itertuples():
    f[i.ST]=i.State
    
fi = {}
for i in fips.itertuples():
    fi[i.FIPS]=i.State
In [ ]:
conf['County'] = conf['Province/State'].fillna('').map(lambda x: x.split(',')[0].strip().replace(' County','') if len(x.split(',')[-1].strip())<3 else None)
conf['ST'] = conf['Province/State'].fillna('').map(lambda x: x.split(',')[-1].strip() if len(x.split(',')[-1].strip())<3 else None)
conf['State'] = conf['Province/State'].fillna('').map(lambda x: x.split(',')[-1].strip() if len(x.split(',')[-1].strip())<3 else x)
conf['State'][conf.State.str.len()<3] = conf['State'][conf.State.str.len()<3].map(f)
conf['Country_Region'][conf['Country/Region']=='US'] = 'US'
In [ ]:
usc = conf[conf['Country_Region'].fillna('').str.contains('US')]
usc.Admin2 = usc.Admin2.fillna(usc.County)
usc.Province_State = usc.Province_State.fillna(usc.State)
In [13]:
keep = ['Admin2','Province_State','Lat','Long_','dt','Confirmed','Deaths','Recovered']
usc = usc[keep]
usc.columns = ['County','State','Lat','Lon','dt','Confirmed','Deaths','Recovered']
In [14]:
usc.sort_values('dt',inplace=True)
In [15]:
fig,ax = plt.subplots(figsize=(15,6))
sns.lineplot('dt','Confirmed',hue='County',data=usc[usc.County.fillna('').str.contains('Westchester')])
Out[15]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f23a9210550>

Step 2: Possible Infection Spread

Now that we have a dataset of COVID-19 cases, let's match back to a dataset of inter county travel. Given that a county has an infected population, we will presume the rest of the county has an infection probability equivalent to the infection rate of the county. We can use this to model the potential spread of COVID-19. First lets just understand how mobile the population of infected counties are.

In [16]:
co = pd.read_csv('co-est2019-alldata.csv',encoding='latin-1')
co.STATE = co.STATE.map(lambda x: str(x).zfill(2))
co.COUNTY = co.COUNTY.map(lambda x: str(x).zfill(3))
co['pop'] = co.POPESTIMATE2019
co['county'] = co.STATE+co.COUNTY

copop = spark.createDataFrame(co[co.STNAME!=co.CTYNAME][['county','pop']])
In [17]:
interp = inter.join(copop.withColumnRenamed('county','HomeCounty'),'HomeCounty').filter('dt=20200212')
In [18]:
covid = spark.createDataFrame(usc[usc.dt=='20200212'][['dt','Confirmed','County','State']])
In [19]:
cfips = pd.read_csv('county_fips.csv',dtype=str)
In [20]:
cfips.State = cfips.State.map(f)
cfips.FIPS = cfips.FIPS.map(lambda x: str(x).zfill(5))
cfips.head(3)
Out[20]:
FIPS County State
0 01001 Autauga Alabama
1 01003 Baldwin Alabama
2 01005 Barbour Alabama
In [21]:
cfips = spark.createDataFrame(cfips.fillna(''))
In [22]:
interp1 = (interp.join(cfips,interp.HomeCounty==cfips.FIPS)
                .drop(interp.HomeCounty)
          )
In [23]:
k = fns.when(col('PercentVisited')>1,lit(1)).otherwise(col('PercentVisited'))

interp2 = (interp1.join(covid,['County','State','dt'],'left')
        .select('dt',
            col('County').alias('HomeCounty'),
            col('State').alias('HomeState'),
            'VisitedCounty',
            (k*col('pop')).alias('NumberVisited'),
            'Confirmed',
            col('pop').alias('HomePop')
           )
)
In [24]:
interp3 = (interp2.join(copop.withColumnRenamed('county','VisitedCounty')
                            .withColumnRenamed('pop','VisitedPop'),'VisitedCounty')
           .filter('dt=20200212')
           .join(cfips,interp2.VisitedCounty==cfips.FIPS)
           .drop('VisitedCounty')
           .withColumnRenamed('County','VisitedCounty')
           .withColumnRenamed('State','VisitedState')           
          )
In [25]:
interp3.cache()
interp3.count()
Out[25]:
271989
In [26]:
interp3.show(5)
+--------+----------+---------+------------------+---------+-------+----------+-----+-------------+------------+
|      dt|HomeCounty|HomeState|     NumberVisited|Confirmed|HomePop|VisitedPop| FIPS|VisitedCounty|VisitedState|
+--------+----------+---------+------------------+---------+-------+----------+-----+-------------+------------+
|20200212|   Weakley|Tennessee| 84.43681327734986|     null|  33328|    108669|17099|     La Salle|    Illinois|
|20200212|   Douglas| Nebraska| 184.6818135050266|     null| 571327|    108669|17099|     La Salle|    Illinois|
|20200212|Stephenson| Illinois| 81.30852459757403|     null|  44498|    108669|17099|     La Salle|    Illinois|
|20200212|  St Louis| Missouri|100.09045038254075|     null| 994205|    108669|17099|     La Salle|    Illinois|
|20200212| Kalamazoo| Michigan| 38.24063309348709|     null| 265066|    108669|17099|     La Salle|    Illinois|
+--------+----------+---------+------------------+---------+-------+----------+-----+-------------+------------+
only showing top 5 rows

In [27]:
interp3.groupby('HomeCounty').agg(fns.max('Confirmed').alias('cases')).orderBy(col('cases').desc()).limit(5).show()
+-----------+-----+
| HomeCounty|cases|
+-----------+-----+
| San Benito|  2.0|
|Santa Clara|  2.0|
|     Orange|  1.0|
|Los Angeles|  1.0|
|  San Diego|  1.0|
+-----------+-----+

In [28]:
vcts = interp3.filter('Confirmed>0').select('FIPS').distinct().collect()
In [32]:
from shapely import geometry as gm
def get_geos(county_fips,val='none'):
    std = {}
    for i in cnty[cnty.GEOID.isin([i.FIPS for i in county_fips])].itertuples():
            chng = 0 if val=='none' else [j[val] for j in county_fips if j.FIPS==i.GEOID][0]
            if i.geometry.type=='Polygon' and len(gm.mapping(i.geometry)['coordinates'])==1:
                s = np.array(gm.mapping(i.geometry.simplify(.001))['coordinates'][0])
                lng,lat = ds.utils.lnglat_to_meters(s[:,0],s[:,1])        
                std[i.NAME] = {'y':lat.tolist(),'x':lng.tolist(),'ST':i.NAME,'change':chng}

            elif i.geometry.type=='Polygon' and len(gm.mapping(i.geometry)['coordinates'])>1:
                holes = []
                for n,c in enumerate(gm.mapping(i.geometry.simplify(.001))['coordinates']):
                    if n==0:
                        s = np.array(c)
                        lng,lat = ds.utils.lnglat_to_meters(s[:,0],s[:,1])
                    else:
                        s = np.array(c)
                        ln,lt = ds.utils.lnglat_to_meters(s[:,0],s[:,1])
                        holes.append([list(i) for i in zip(ln,lt)])
                #std[i.NAME] = {'lat':lat,'lng':lng,'ST':i.NAME,'holes':holes,'change':chng}
                std[i.NAME] = {'y':lat.tolist(),'x':lng.tolist(),'ST':i.NAME,'holes':[holes],'change':chng}
            else:
                holes = []
                for n,c in enumerate(gm.mapping(i.geometry.simplify(.001))['coordinates']):
                    if len(c)>1:
                        holes = []
                        for n,k in enumerate(c):
                            if n==0:
                                s = np.array(k)
                                lng,lat = ds.utils.lnglat_to_meters(s[:,0],s[:,1])
                            else:
                                s = np.array(k)
                                ln,lt = ds.utils.lnglat_to_meters(s[:,0],s[:,1])
                                holes.append([list(i) for i in zip(ln,lt)])
                        std[i.NAME] = {'y':lat.tolist(),'x':lng.tolist(),'ST':i.NAME,'holes':[holes],'change':chng}
                    else:
                        s = np.array(c[0])
                        lng,lat = ds.utils.lnglat_to_meters(s[:,0],s[:,1])
                        #std[i.NAME+str(n)] = {'lat':lat,'lng':lng,'ST':i.NAME,'holes':[],'change':chng}
                        std[i.NAME+str(n)] = {'y':lat.tolist(),'x':lng.tolist(),'ST':i.NAME,'change':chng}
    return std

std = get_geos(vcts)
In [33]:
hv.element.tiles.CartoLight()*hv.Polygons([v for k,v in std.items()],vdims=['ST']).opts(opts.Polygons(width=700, height=500,  tools=['hover', 'tap'],xaxis=None,color_index='change',clabel='change',
                         cmap="Reds",alpha=.5,
                  yaxis=None))    
Out[33]:

Step 3: Model Potential Spread

Given that residents of infected counties travel to noninfected counties, we'd like to see how disease transmission could occur. Intercounty mobility allows for a view of computing potential infection rates as the sum of probabilities over residents of infected counties into a noninfected county. This estimate can then be used to compute the potential infected population and incorporated into a SIR model.

In [34]:
probab_infected = (col('Confirmed')/col('HomePop'))*col('Numbervisited')

infected = (interp3.groupby('VisitedCounty','VisitedState')
    .agg(fns.sum(probab_infected).alias('infected'))
    .orderBy(col('infected').desc())
    .toPandas()
)

infectors = interp3.filter('Confirmed>0').select('HomeCounty','HomeState').distinct().toPandas()

inf = infected.merge(infectors,left_on=['VisitedCounty','VisitedState'],right_on=['HomeCounty','HomeState'],how='left')
In [35]:
inf = inf[inf.HomeCounty.isnull()]
inf['Location'] = inf.VisitedCounty+', '+inf.VisitedState
fig,ax = plt.subplots(figsize=(10,6))
ax.set_title('Expected Infected Persons')
sns.barplot('infected','Location',data=inf[inf.infected>0.01][['Location','infected']],orient="h")
Out[35]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f23a3dd51d0>
In [36]:
infe = spark.createDataFrame(inf[inf.infected>0.01])
vcts = cfips.join(infe,((infe.VisitedCounty==cfips.County) & (infe.VisitedState==cfips.State))).select('FIPS','infected').collect()
In [37]:
std = get_geos(vcts,'infected')
hv.element.tiles.CartoLight()*hv.Polygons([v for k,v in std.items()],vdims=['ST','change']).opts(opts.Polygons(width=700, height=500,  tools=['hover', 'tap'],xaxis=None,color_index='change',clabel='change',
                         cmap="Blues",alpha=.5,colorbar=True,
                  yaxis=None))    
Out[37]:
In [38]:
co[co.CTYNAME.str.contains('Pima')].POPESTIMATE2019
Out[38]:
109    1047279
Name: POPESTIMATE2019, dtype: int64


We can now model the rate of infection and hospitalization for Pima County, AZ. We use the following parameters to model the rate spread of disease in a SIR epidemic model: - Infected count as .01068 persons - Population of 1,047,279 and 50% are susceptible - Contact rate of 2.5 persons - Latency of 14 days

In [30]:
#https://scipython.com/book/chapter-8-scipy/additional-examples/the-sir-epidemic-model/

%matplotlib inline
import numpy as np
from scipy.integrate import odeint
import matplotlib.pyplot as plt

# Total population, N.
N = 1047279*.5
# Initial number of infected and recovered individuals, I0 and R0.
I0, R0 = .01608, 0
# Everyone else, S0, is susceptible to infection initially.
S0 = N - I0 - R0
# Contact rate, beta, and mean recovery rate, gamma, (in 1/days).
beta, gamma = 1/2.5, 1./14
# A grid of time points (in days)
t = np.linspace(0, 160, 160)

# The SIR model differential equations.
def deriv(y, t, N, beta, gamma):
    S, I, R = y
    dSdt = -beta * S * I / N
    dIdt = beta * S * I / N - gamma * I
    dRdt = gamma * I
    return dSdt, dIdt, dRdt

# Initial conditions vector
y0 = S0, I0, R0
# Integrate the SIR equations over the time grid, t.
ret = odeint(deriv, y0, t, args=(N, beta, gamma))
S, I, R = ret.T

# Plot the data on three separate curves for S(t), I(t) and R(t)
fig = plt.figure(facecolor='w', figsize=(16,6))
ax = fig.add_subplot(111, axisbelow=True)
ax.plot(t, S/N, 'b', alpha=0.5, lw=2, label='Susceptible')
ax.plot(t, I/N, 'r', alpha=0.5, lw=2, label='Infected')
ax.plot(t, R/N, 'g', alpha=0.5, lw=2, label='Recovered with immunity')
ax.set_xlabel('Time /days')
ax.set_ylabel('Number (1000s)')
ax.set_ylim(0,1.2)
ax.yaxis.set_tick_params(length=0)
ax.xaxis.set_tick_params(length=0)
ax.grid(b=True, which='major', c='w', lw=2, ls='-')
legend = ax.legend()
legend.get_frame().set_alpha(0.5)
for spine in ('top', 'right', 'bottom', 'left'):
    ax.spines[spine].set_visible(False)
ax.set_title("Pima County, AZ Estimated Infections (Day 0 = 2020-02-12)")
plt.show()
In [ ]:
usc[usc.County.fillna('').str.contains('Pima')]

The estimate above would suggest either a significant number of cases have not been tested and confirmed or that our model may need some fine tuning. In either case, population flows from an infected community can introduce a nontrivial amount of risk to noninfected communities.